{
  "cells": [
    {
      "cell_type": "markdown",
      "id": "4548b5b3",
      "metadata": {
        "id": "4548b5b3"
      },
      "source": [
        "# Block Logging\n",
        "\n",
        "<a href=\"https://colab.research.google.com/github/lwa-project/bifrost/blob/master/tutorial/05_block_logging.ipynb\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open in Colab\"></a>\n",
        "\n",
        "In the previous section we looked at how to put blocks and rings together to build a simple pipeline.  In this section we will look at the tools Bifrost provides to analyze pipeline performance and how to integrate those into blocks.\n"
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "%%capture install_log\n",
        "# Import bifrost, but attempt to auto-install if needed (and we're running on\n",
        "# Colab). If something goes wrong, evaluate install_log.show() in a new block\n",
        "# to retrieve the details.\n",
        "try:\n",
        "  import bifrost\n",
        "except ModuleNotFoundError as exn:\n",
        "  try:\n",
        "    import google.colab\n",
        "  except ModuleNotFoundError:\n",
        "    raise exn\n",
        "  !sudo apt-get -qq install universal-ctags libopenblas-dev software-properties-common build-essential\n",
        "  !pip install -q contextlib2 pint simplejson scipy git+https://github.com/ctypesgen/ctypesgen.git\n",
        "  ![ -d ~/bifrost/.git ] || git clone https://github.com/lwa-project/bifrost ~/bifrost\n",
        "  !(cd ~/bifrost && ./configure --disable-cuda && make -j all && sudo make install)\n",
        "  import bifrost"
      ],
      "metadata": {
        "id": "OE9pYXXj-rkG"
      },
      "id": "OE9pYXXj-rkG",
      "execution_count": 1,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "\n",
        "Let's start with the `CopyOp` block from the previous section:"
      ],
      "metadata": {
        "id": "kaj6t0I--pz6"
      },
      "id": "kaj6t0I--pz6"
    },
    {
      "cell_type": "code",
      "execution_count": 2,
      "id": "84820717",
      "metadata": {
        "id": "84820717"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "import json\n",
        "import time\n",
        "import numpy\n",
        "import threading\n",
        "\n",
        "class CopyOp(object):\n",
        "    def __init__(self, iring, oring, ntime_gulp=250, guarantee=True, core=-1):\n",
        "        self.iring = iring\n",
        "        self.oring = oring\n",
        "        self.ntime_gulp = ntime_gulp\n",
        "        self.guarantee = guarantee\n",
        "        self.core = core\n",
        "        \n",
        "    def main(self):\n",
        "        with self.oring.begin_writing() as oring:\n",
        "            for iseq in self.iring.read(guarantee=self.guarantee):\n",
        "                ihdr = json.loads(iseq.header.tostring())\n",
        "                \n",
        "                print(\"Copy: Start of new sequence:\", str(ihdr))\n",
        "                \n",
        "                time_tag = ihdr['time_tag']\n",
        "                navg     = ihdr['navg']\n",
        "                nbeam    = ihdr['nbeam']\n",
        "                chan0    = ihdr['chan0']\n",
        "                nchan    = ihdr['nchan']\n",
        "                chan_bw  = ihdr['bw'] / nchan\n",
        "                npol     = ihdr['npol']\n",
        "                pols     = ihdr['pols']\n",
        "                pols     = pols.replace('CR', 'XY_real')\n",
        "                pols     = pols.replace('CI', 'XY_imag')\n",
        "\n",
        "                igulp_size = self.ntime_gulp*nbeam*nchan*npol*4        # float32\n",
        "                ishape = (self.ntime_gulp,nbeam,nchan,npol)\n",
        "                self.iring.resize(igulp_size, igulp_size*5)\n",
        "                \n",
        "                ogulp_size = igulp_size\n",
        "                oshape = ishape\n",
        "                self.oring.resize(ogulp_size)\n",
        "                \n",
        "                ohdr = ihdr.copy()\n",
        "                ohdr_str = json.dumps(ohdr)\n",
        "                \n",
        "                iseq_spans = iseq.read(igulp_size)\n",
        "                with oring.begin_sequence(time_tag=time_tag, header=ohdr_str) as oseq:\n",
        "                    for ispan in iseq_spans:\n",
        "                        if ispan.size < igulp_size:\n",
        "                            continue # Ignore final gulp\n",
        "                            \n",
        "                        with oseq.reserve(ogulp_size) as ospan:\n",
        "                           idata = ispan.data_view(numpy.float32)\n",
        "                           odata = ospan.data_view(numpy.float32)    \n",
        "                           odata[...] = idata"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "18cdb7e1",
      "metadata": {
        "id": "18cdb7e1"
      },
      "source": [
        "If we want to know how this block is performing we can add timing statements to it and record the values using Bifrost's `bifrost.proclog` functionality.  This provides a lightweight logger that records information to `/dev/shm/bifrost` where it can be accessed.\n",
        "\n",
        "To add logging to this block we would update it with:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 3,
      "id": "cfea96e1",
      "metadata": {
        "id": "cfea96e1"
      },
      "outputs": [],
      "source": [
        "from bifrost.proclog import ProcLog\n",
        "from bifrost.affinity import get_core, set_core\n",
        "\n",
        "class CopyOp(object):\n",
        "    def __init__(self, iring, oring, ntime_gulp=250, guarantee=True, core=-1):\n",
        "        self.iring = iring\n",
        "        self.oring = oring\n",
        "        self.ntime_gulp = ntime_gulp\n",
        "        self.guarantee = guarantee\n",
        "        self.core = core\n",
        "        \n",
        "        # ProcLog updates:\n",
        "        self.bind_proclog = ProcLog(type(self).__name__+\"/bind\")\n",
        "        self.in_proclog   = ProcLog(type(self).__name__+\"/in\")\n",
        "        self.out_proclog  = ProcLog(type(self).__name__+\"/out\")\n",
        "        self.size_proclog = ProcLog(type(self).__name__+\"/size\")\n",
        "        self.sequence_proclog = ProcLog(type(self).__name__+\"/sequence0\")\n",
        "        self.perf_proclog = ProcLog(type(self).__name__+\"/perf\")\n",
        "        \n",
        "        self.in_proclog.update(  {'nring':1, 'ring0':self.iring.name})\n",
        "        self.out_proclog.update( {'nring':1, 'ring0':self.oring.name})\n",
        "        self.size_proclog.update({'nseq_per_gulp': self.ntime_gulp})\n",
        "        \n",
        "    def main(self):\n",
        "        set_core(self.core)\n",
        "        self.bind_proclog.update({'ncore': 1, \n",
        "                                  'core0': get_core(),})\n",
        "        \n",
        "        with self.oring.begin_writing() as oring:\n",
        "            for iseq in self.iring.read(guarantee=self.guarantee):\n",
        "                ihdr = json.loads(iseq.header.tostring())\n",
        "                \n",
        "                self.sequence_proclog.update(ihdr)\n",
        "                \n",
        "                print(\"Copy: Start of new sequence:\", str(ihdr))\n",
        "                \n",
        "                time_tag = ihdr['time_tag']\n",
        "                navg     = ihdr['navg']\n",
        "                nbeam    = ihdr['nbeam']\n",
        "                chan0    = ihdr['chan0']\n",
        "                nchan    = ihdr['nchan']\n",
        "                chan_bw  = ihdr['bw'] / nchan\n",
        "                npol     = ihdr['npol']\n",
        "                pols     = ihdr['pols']\n",
        "                pols     = pols.replace('CR', 'XY_real')\n",
        "                pols     = pols.replace('CI', 'XY_imag')\n",
        "\n",
        "                igulp_size = self.ntime_gulp*nbeam*nchan*npol*4        # float32\n",
        "                ishape = (self.ntime_gulp,nbeam,nchan,npol)\n",
        "                self.iring.resize(igulp_size, igulp_size*5)\n",
        "                \n",
        "                ogulp_size = igulp_size\n",
        "                oshape = ishape\n",
        "                self.oring.resize(ogulp_size)\n",
        "                \n",
        "                ohdr = ihdr.copy()\n",
        "                ohdr_str = json.dumps(ohdr)\n",
        "                \n",
        "                prev_time = time.time()\n",
        "                iseq_spans = iseq.read(igulp_size)\n",
        "                with oring.begin_sequence(time_tag=time_tag, header=ohdr_str) as oseq:\n",
        "                    for ispan in iseq_spans:\n",
        "                        if ispan.size < igulp_size:\n",
        "                            continue # Ignore final gulp\n",
        "                        curr_time = time.time()\n",
        "                        acquire_time = curr_time - prev_time\n",
        "                        prev_time = curr_time\n",
        "                        \n",
        "                        with oseq.reserve(ogulp_size) as ospan:\n",
        "                            curr_time = time.time()\n",
        "                            reserve_time = curr_time - prev_time\n",
        "                            prev_time = curr_time\n",
        "                            \n",
        "                            idata = ispan.data_view(numpy.float32)\n",
        "                            odata = ospan.data_view(numpy.float32)    \n",
        "                            odata[...] = idata\n",
        "                            \n",
        "                        curr_time = time.time()\n",
        "                        process_time = curr_time - prev_time\n",
        "                        prev_time = curr_time\n",
        "                        self.perf_proclog.update({'acquire_time': acquire_time, \n",
        "                                                  'reserve_time': reserve_time, \n",
        "                                                  'process_time': process_time,})"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "5c784a96",
      "metadata": {
        "id": "5c784a96"
      },
      "source": [
        "In the `__init__` method of `CopyOp` we define all of the logging monitoring points:\n",
        "\n",
        " * `bind_proclog` — CPU and GPU binding information\n",
        " * `in_proclog` — input ring(s), if any\n",
        " * `out_proclog` — output ring(s), if any\n",
        " * `size_proclog` — gulp size in use\n",
        " * `sequence_proclog` — metadata for the current sequence being processed\n",
        " * `perf_proclog` — block performance timing, consisting of:\n",
        " * `acquire_time` — time spent waiting to read a span\n",
        " * `reserve_time` — time spect waiting to acquire an output span for writing\n",
        " * `process_time` — time spent processing data (basically everything else)\n",
        "\n",
        "For some of these monitoring points, like in, out, and size, the values are known when the block is created and they can be saved during initalization.  For others, like sequence and perf, they can only be determined while running inside `main`.\n",
        "\n",
        "In addition to these explict monitoring points, Bifrost also creates monitoring points for the rings that capture their names, sizes, and space.  All of this information is recorded to `/dev/shm/bifrost` in a per-process, per-block/ring fashion.  The path would be:\n",
        "\n",
        "    /dev/shm/bifrost/<control_PID>/<block_name>/<monitor_point>\n",
        "\n",
        "Bifrost provides tools, like the top-like utility `like_top.py`, that can tap into this information and provide real time information about a running pipeline.  The `bifrost.proclog` module also provides methods for accessing these data in a programmatic way."
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.6.9"
    },
    "colab": {
      "name": "05_block_logging.ipynb",
      "provenance": []
    }
  },
  "nbformat": 4,
  "nbformat_minor": 5
}
